00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include <iostream>
00028 #include <vector>
00029 #include <utility>
00030 #include <boost/mpi.hpp>
00031 #include <boost/serialization/vector.hpp>
00032 #include <boost/serialization/utility.hpp>
00033
00034 #include "gridpack/parallel/distributed.hpp"
00035
00036 #ifndef _shuffler_hpp_
00037 #define _shuffler_hpp_
00038
00039 namespace gridpack {
00040 namespace parallel {
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059 template <typename Thing, typename Index = int>
00060 class Shuffler
00061 : public Distributed,
00062 private utility::Uncopyable
00063 {
00064 public:
00065
00066
00067
00068
00069 typedef std::vector<Thing> ThingVector;
00070 typedef std::vector<Index> IndexVector;
00071
00072 Shuffler(const Communicator& comm)
00073 : Distributed(comm), utility::Uncopyable()
00074 {}
00075
00076 ~Shuffler(void) {}
00077
00078
00079 void operator()(ThingVector& locthings, const IndexVector& destproc)
00080 {
00081 BOOST_ASSERT(locthings.size() == destproc.size());
00082
00083 const boost::mpi::communicator& comm(this->communicator());
00084
00085 if (comm.size() <= 1) return;
00086
00087 size_t nthings(0);
00088 all_reduce(comm, locthings.size(), nthings, std::plus<size_t>());
00089 if (nthings <= 0) return;
00090
00091
00092
00093 ThingVector tvect(locthings);
00094 locthings.clear();
00095
00096
00097
00098 std::vector<ThingVector> tosend(comm.size());
00099
00100
00101
00102
00103 size_t locidx(0);
00104
00105 for (typename IndexVector::const_iterator dest = destproc.begin();
00106 dest != destproc.end(); ++dest) {
00107 if (*dest == comm.rank()) {
00108 locthings.push_back(tvect[locidx]);
00109 } else {
00110 tosend[*dest].push_back(tvect[locidx]);
00111 }
00112 locidx += 1;
00113 }
00114
00115 #if 0
00116 for (int src = 0; src < comm.size(); ++src) {
00117 ThingVector tmp;
00118 if (comm.rank() == src) {
00119 scatter(comm, tosend, tmp, src);
00120 } else {
00121 scatter(comm, tmp, src);
00122 }
00123 std::copy(tmp.begin(), tmp.end(), std::back_inserter(locthings));
00124 }
00125 }
00126 #else
00127 int me = comm.rank();
00128 int nprocs = comm.size();
00129 for (int src = 0; src < nprocs; ++src) {
00130
00131
00132
00133 std::vector<int> srcsizes(nprocs);
00134 std::vector<int> tsizes(nprocs);
00135 for (int i = 0; i<nprocs; i++) {
00136 if (src == me) {
00137 srcsizes[i] = tosend[i].size();
00138 } else {
00139 srcsizes[i] = 0;
00140 }
00141 }
00142 int ierr;
00143 ierr = MPI_Allreduce(&srcsizes[0],&tsizes[0],nprocs,MPI_INT,MPI_SUM,
00144 static_cast<MPI_Comm>(comm));
00145 if (ierr != 0) {
00146
00147 }
00148
00149
00150 ThingVector tmp;
00151 if (me == src) {
00152 for (int i=0; i<nprocs; i++) {
00153 if (i == me) {
00154
00155 tmp = tosend[i];
00156 } else {
00157
00158 if (tsizes[i] > 0)
00159 static_cast<boost::mpi::communicator>(comm).send(i,src,tosend[i]);
00160 }
00161 }
00162 } else {
00163
00164 if (tsizes[me] > 0)
00165 static_cast<boost::mpi::communicator>(comm).recv(src,src,tmp);
00166 }
00167 std::copy(tmp.begin(), tmp.end(), std::back_inserter(locthings));
00168 }
00169 }
00170 #endif
00171
00172 };
00173
00174
00175 }
00176 }
00177
00178
00179 #endif